# Copyright (c) 2021-2025, PostgreSQL Global Development Group

# Tests for already-propagated WAL segments ending in incomplete WAL records.

use strict;
use warnings;

use File::Copy;
use PostgreSQL::Test::Cluster;
use Test::More;
use Fcntl qw(SEEK_SET);

use integer;    # causes / operator to use integer math

# Values queried from the server
my $WAL_SEGMENT_SIZE;
my $WAL_BLOCK_SIZE;
my $TLI;

# Build name of a WAL segment, used when filtering the contents of the server
# logs.
sub wal_segment_name
{
	my $tli = shift;
	my $segment = shift;
	return sprintf("%08X%08X%08X", $tli, 0, $segment);
}

# Calculate from a LSN (in bytes) its segment number and its offset, used
# when filtering the contents of the server logs.
sub lsn_to_segment_and_offset
{
	my $lsn = shift;
	return ($lsn / $WAL_SEGMENT_SIZE, $lsn % $WAL_SEGMENT_SIZE);
}

# Get GUC value, converted to an int.
sub get_int_setting
{
	my $node = shift;
	my $name = shift;
	return int(
		$node->safe_psql(
			'postgres',
			"SELECT setting FROM pg_settings WHERE name = '$name'"));
}

# Find the start of a WAL page, based on an LSN in bytes.
sub start_of_page
{
	my $lsn = shift;
	return $lsn & ~($WAL_BLOCK_SIZE - 1);
}

my $primary = PostgreSQL::Test::Cluster->new('primary');
$primary->init(allows_streaming => 1, has_archiving => 1);

# The configuration is chosen here to minimize the friction with
# concurrent WAL activity.  checkpoint_timeout avoids noise with
# checkpoint activity, and autovacuum is disabled to avoid any
# WAL activity generated by it.
$primary->append_conf(
	'postgresql.conf', qq(
autovacuum = off
checkpoint_timeout = '30min'
wal_keep_size = 1GB
));

$primary->start;
$primary->backup('backup');

$primary->safe_psql('postgres', "CREATE TABLE t AS SELECT 0");

$WAL_SEGMENT_SIZE = get_int_setting($primary, 'wal_segment_size');
$WAL_BLOCK_SIZE = get_int_setting($primary, 'wal_block_size');
$TLI = $primary->safe_psql('postgres',
	"SELECT timeline_id FROM pg_control_checkpoint()");

# Get close to the end of the current WAL page, enough to fit the
# beginning of a record that spans on two pages, generating a
# continuation record.
$primary->emit_wal(0);
my $end_lsn =
  $primary->advance_wal_out_of_record_splitting_zone($WAL_BLOCK_SIZE);

# Do some math to find the record size that will overflow the page, and
# write it.
my $overflow_size = $WAL_BLOCK_SIZE - ($end_lsn % $WAL_BLOCK_SIZE);
$end_lsn = $primary->emit_wal($overflow_size);
$primary->stop('immediate');

# Find the beginning of the page with the continuation record and fill
# the entire page with zero bytes to simulate broken replication.
my $start_page = start_of_page($end_lsn);
my $wal_file = $primary->write_wal($TLI, $start_page, $WAL_SEGMENT_SIZE,
	"\x00" x $WAL_BLOCK_SIZE);

# Copy the file we just "hacked" to the archives.
copy($wal_file, $primary->archive_dir);

# Start standby nodes and make sure they replay the file "hacked" from
# the archives of the primary.
my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
$standby1->init_from_backup(
	$primary, 'backup',
	standby => 1,
	has_restoring => 1);

my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
$standby2->init_from_backup(
	$primary, 'backup',
	standby => 1,
	has_restoring => 1);

my $log_size1 = -s $standby1->logfile;
my $log_size2 = -s $standby2->logfile;

$standby1->start;
$standby2->start;

my ($segment, $offset) = lsn_to_segment_and_offset($start_page);
my $segment_name = wal_segment_name($TLI, $segment);
my $pattern =
  qq(invalid magic number 0000 .* segment $segment_name.* offset $offset);

# We expect both standby nodes to complain about an empty page when trying to
# assemble the record that spans over two pages, so wait for such reports in
# their logs.
$standby1->wait_for_log($pattern, $log_size1);
$standby2->wait_for_log($pattern, $log_size2);

# Now check the case of a promotion with a timeline jump handled at
# page boundary with a continuation record.
$standby1->promote;

# This command forces standby2 to read a continuation record from the page
# that is filled with zero bytes.
$standby1->safe_psql('postgres', 'SELECT pg_switch_wal()');

# Make sure WAL moves forward.
$standby1->safe_psql('postgres',
	'INSERT INTO t SELECT * FROM generate_series(1, 1000)');

# Configure standby2 to stream from just promoted standby1 (it also pulls WAL
# files from the archive).  It should be able to catch up.
$standby2->enable_streaming($standby1);
$standby2->reload;
$standby1->wait_for_replay_catchup($standby2);

my $result = $standby2->safe_psql('postgres', "SELECT count(*) FROM t");
print "standby2: $result\n";
is($result, qq(1001), 'check streamed content on standby2');

done_testing();
